# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-name, import-self, unused-argument, unused-variable, inconsistent-return-statements
"""CoreML frontend."""
from __future__ import absolute_import as _abs
import numpy as np
import tvm
from .. import ir_pass
from .. import expr as _expr
from .. import op as _op
from ... import nd as _nd
from ..._ffi import base as _base
from .common import ExprTable

__all__ = ['from_coreml']


def _NeuralNetworkImageScaler(op, inexpr, etab):
    # this changes the symbol
    biases = np.array([op.blueBias, op.greenBias, op.redBias]).reshape([3, 1, 1])
    bias = etab.new_const(biases)
    ret = _op.multiply(inexpr, _expr.const(op.channelScale, dtype='float32'))
    ret = _op.add(ret, bias)
    return ret


def _NeuralNetworkMeanImage(op, inexpr, etab):
    # this changes the symbol
    ret = _op.subtract(inexpr, _expr.const(op.meanImage, dtype='float32'))
    return ret


def _ConvolutionLayerParams(op, inexpr, etab):
    """Convolution layer params."""
    weights = etab.new_const(np.array(list(op.weights.floatValue)).reshape(
        tuple([op.outputChannels, op.kernelChannels] + list(op.kernelSize))))
    dilation = list(op.dilationFactor)
    if not dilation:
        dilation = [1, 1]
    params = {'channels':op.outputChannels,
              'kernel_size':list(op.kernelSize),
              'strides':list(op.stride),
              'dilation': dilation,
              'groups':op.nGroups}

    if op.WhichOneof('ConvolutionPaddingType') == 'valid':
        valid = op.valid
        padding = [b.startEdgeSize for b in valid.paddingAmounts.borderAmounts]
        padding2 = [b.endEdgeSize for b in valid.paddingAmounts.borderAmounts]
        for i, j in zip(padding, padding2):
            assert i == j, "Asymmetry padding not supported"
        if padding:
            params['padding'] = padding
    elif op.WhichOneof('ConvolutionPaddingType') == 'same':
        kernel = params['kernel_size']
        pad_h = kernel[0] - 1
        pad_w = kernel[1] - 1
        pad_t = pad_h // 2
        pad_l = pad_w // 2
        pad_b = pad_h - pad_t
        pad_r = pad_w - pad_l
        assert pad_t == pad_r and pad_l == pad_b, "Asymmetry padding not supported"
        params['padding'] = [pad_t, pad_l]
    else:
        raise NotImplementedError("Valid/Same convolution padding implemented")

    # consume padding layer
    if etab.in_padding:
        params['padding'] = [sum(x) for x in zip(params.get('padding', [0, 0]), etab.paddings)]
        etab.clear_padding()

    if op.isDeconvolution:
        ret = _op.nn.conv2d_transpose(data=inexpr, weight=weights, **params)
    else:
        ret = _op.nn.conv2d(data=inexpr, weight=weights, **params)
    if op.hasBias:
        biases = etab.new_const(list(op.bias.floatValue))
        ret = _op.nn.bias_add(ret, biases)

    return ret


def _BatchnormLayerParams(op, inexpr, etab):
    """Get layer of batchnorm parameter"""
    # this changes the symbol
    if op.instanceNormalization:
        raise tvm.error.OpNotImplemented(
            'Operator "instance normalization" is not supported in frontend CoreML.')
    else:
        params = {'gamma':etab.new_const(list(op.gamma.floatValue)),
                  'beta':etab.new_const(list(op.beta.floatValue)),
                  'moving_mean':etab.new_const(list(op.mean.floatValue)),
                  'moving_var': etab.new_const(list(op.variance.floatValue)),
                  'epsilon': op.epsilon}
        result, moving_mean, moving_var = _op.nn.batch_norm(data=inexpr, **params)
        return result


def _ActivationParams(op, inexpr, etab):
    """Get activation parameters"""
    whichActivation = op.WhichOneof('NonlinearityType')
    par = getattr(op, whichActivation)
    if whichActivation == 'linear':
        alpha = _expr.const(par.alpha, dtype='float32')
        beta = _expr.const(par.beta, dtype='float32')
        return _op.add(_op.multiply(inexpr, alpha), beta)
    if whichActivation == 'ReLU':
        return _op.nn.relu(inexpr)
    if whichActivation == 'leakyReLU':
        _op.nn.leaky_relu(inexpr, alpha=_expr.const(par.alpha, dtype='float32'))
    elif whichActivation == 'thresholdedReLU':
        alpha_tensor = _op.full_like(inexpr, fill_value=_expr.const(par.alpha, dtype='float32'))
        return _op.multiply(inexpr, _op.greater(inexpr, alpha_tensor).as_type('float32'))
    if whichActivation == 'PReLU':
        return _op.nn.prelu(inexpr, alpha=_expr.const(par.alpha, dtype='float32'))
    if whichActivation == 'tanh':
        return _op.tanh(inexpr)
    if whichActivation == 'scaledTanh':
        alpha = _expr.const(par.alpha, dtype='float32')
        beta = _expr.const(par.beta, dtype='float32')
        return _op.multiply(_op.tanh(_op.multiply(inexpr, beta)), alpha)
    if whichActivation == 'sigmoid':
        return _op.sigmoid(inexpr)
    if whichActivation == 'sigmoidHard':
        alpha = _expr.const(par.alpha, dtype='float32')
        beta = _expr.const(par.beta, dtype='float32')
        transformX = (alpha * inexpr) + beta
        return _op.clip(transformX, a_min=0., a_max=1.)
    if whichActivation == 'ELU':
        return _op.multiply(_op.add(_op.exp(inexpr), _expr.const(-1, dtype='float32')),
                            _expr.const(par.alpha, dtype='float32'))
    if whichActivation == 'softsign':
        return inexpr / (_expr.const(1, dtype='float32') + (
            op.nn.relu(inexpr) + _op.nn.relu(_op.negative(inexpr))))
    if whichActivation == 'softplus':
        return _op.log(_op.add(_op.exp(inexpr), _expr.const(1, dtype='float32')))
    if whichActivation == 'parametricSoftplus':
        alpha = list(par.alpha.floatValue)
        beta = list(par.alpha.floatValue)
        if len(alpha) == 1:
            return _op.multiply(_op.log(_op.add(_op.exp(inexpr),
                                                _expr.const(beta[0], dtype='float32'))),
                                _expr.const(alpha[0], dtype='float32'))
        alpha = np.array(alpha).reshape((len(alpha), 1, 1))
        beta = np.array(beta).reshape((len(beta), 1, 1))
        alpha_expr = etab.new_const(alpha)
        beta_expr = etab.new_const(beta)
        return _op.multiply(_op.log(_op.add(_op.exp(inexpr), beta_expr)), alpha_expr)
    raise tvm.error.OpNotImplemented(
        'Operator {} is not supported in frontend CoreML.'.format(whichActivation))


def _ScaleLayerParams(op, inexpr, etab):
    """Scale layer params."""
    scale = etab.new_const(np.array(list(op.scale.floatValue)).reshape(
        tuple(list(op.shapeScale) + [1, 1])))
    ret = _op.multiply(inexpr, scale)
    if op.hasBias:
        bias = etab.new_const(np.array(list(op.bias.floatValue)).reshape(
            tuple(list(op.shapeBias) + [1, 1])))
        ret = _op.add(ret, bias)
    return ret


def _PoolingLayerParams(op, inexpr, etab):
    """get pooling parameters"""
    if op.globalPooling:
        if op.type == 0:
            return _op.nn.global_max_pool2d(inexpr)
        if op.type == 1:
            return _op.nn.global_avg_pool2d(inexpr)
        raise tvm.error.OpNotImplemented(
            'Only Max and Average Pooling are supported in frontend CoreML.')

    else:
        params = {'pool_size':list(op.kernelSize),
                  'strides':list(op.stride)}

        if op.WhichOneof('PoolingPaddingType') == 'valid':
            valid = op.valid
            padding = [b.startEdgeSize for b in valid.paddingAmounts.borderAmounts]
            padding2 = [b.endEdgeSize for b in valid.paddingAmounts.borderAmounts]
            for i, j in zip(padding, padding2):
                assert i == j
            params['padding'] = padding
        elif op.WhichOneof('PoolingPaddingType') == 'includeLastPixel':
            # I don't know if this is correct
            valid = op.includeLastPixel
            padding = list(valid.paddingAmounts)
            params['padding'] = padding
            params['ceil_mode'] = True
        else:
            msg = 'PoolingPaddingType {} is not supported in operator Pooling.'
            op_name = op.WhichOneof('PoolingPaddingType')
            raise tvm.error.OpAttributeUnimplemented(msg.format(op_name))

        # consume padding layer
        if etab.in_padding:
            params['padding'] = [sum(x) for x in zip(
                params.get('padding', [0, 0]), etab.paddings)]
            etab.clear_padding()

        if op.type == 0:
            return _op.nn.max_pool2d(inexpr, **params)
        if op.type == 1:
            return _op.nn.avg_pool2d(inexpr, **params)
        raise tvm.error.OpNotImplemented(
            'Only Max and Average Pooling are supported in CoreML.')


def _SoftmaxLayerParams(op, inexpr, etab):
    return _op.nn.softmax(_op.nn.batch_flatten(inexpr))


def _InnerProductLayerParams(op, inexpr, etab):
    weights = etab.new_const(np.array(op.weights.floatValue).reshape(
        (op.outputChannels, op.inputChannels)))
    out = _op.nn.dense(data=inexpr, weight=weights, units=op.outputChannels)
    if op.hasBias:
        bias = etab.new_const(np.array(op.bias.floatValue))
        out = _op.nn.bias_add(out, bias)
    return out


def _AddLayerParams(op, inexpr, etab):
    if not isinstance(inexpr, list):
        inexpr = [inexpr]
    ret = inexpr[0]
    for i in range(1, len(inexpr)):
        ret = _op.add(ret, inexpr[i])
    if op.alpha > 0:
        ret = _op.add(ret, _expr.const(op.alpha, dtype='float32'))
    return ret


def _MultiplyLayerParams(op, inexpr, etab):
    if not isinstance(inexpr, list):
        inexpr = [inexpr]
    ret = inexpr[0]
    for i in range(1, len(inexpr)):
        ret = _op.multiply(ret, inexpr[i])
    if op.alpha != 1:
        ret = _op.multiply(ret, _expr.const(op.alpha, dtype='float32'))
    return ret


def _ConcatLayerParams(op, inexpr, etab):
    if not isinstance(inexpr, list):
        inexpr = [inexpr]
    if op.sequenceConcat:
        raise tvm.error.OpNotImplemented(
            'Operator Sequence Concat is not supported in frontend CoreML.')
    ret = _op.concatenate(inexpr, axis=1)
    return ret


def _FlattenLayerParams(op, inexpr, etab):
    if op.mode == 1:
        inexpr = _op.transpose(_op.reshape(inexpr, newshape=(0, 0, -1)), axes=(0, 2, 1))
    return _op.nn.batch_flatten(inexpr)


def _PaddingLayerParams(op, inexpr, etab):
    """Hacking for padding layer params."""
    if op.WhichOneof('PaddingType') == 'constant':
        constant = op.constant
        if constant.value != 0:
            raise tvm.error.OpAttributeUnimplemented(
                '{} is not supported in operator Padding.'.format(constant.value))
        padding = [b.startEdgeSize for b in op.paddingAmounts.borderAmounts]
        padding2 = [b.endEdgeSize for b in op.paddingAmounts.borderAmounts]
        for i, j in zip(padding, padding2):
            assert i == j
        etab.set_padding(padding)
    else:
        raise tvm.error.OpNotImplemented(
            'Non-constant padding is not supported in frontend CoreML.')
    return inexpr


def _PermuteLayerParams(op, inexpr, etab):
    axes = tuple(op.axis)
    return _op.transpose(inexpr, axes=axes)


def _UpsampleLayerParams(op, inexpr, etab):
    if op.scalingFactor[0] != op.scalingFactor[1]:
        raise tvm.error.OpAttributeUnimplemented(
            'Upsample height and width must be equal.')
    interpolationMode = 'NEAREST_NEIGHBOR' if op.mode == 0 else 'BILINEAR'
    return _op.nn.upsampling(inexpr, scale=op.scalingFactor[0], method=interpolationMode)


def _L2NormalizeLayerParams(op, inexpr, etab):
    return _op.nn.l2_normalize(inexpr, eps=op.epsilon, axis=[1])


def _LRNLayerParams(op, inexpr, etab):
    par = {}
    par['size'] = op.localSize
    par['bias'] = op.k
    par['alpha'] = op.alpha
    par['beta'] = op.beta
    par['axis'] = 1 # default layout is nchw
    return _op.nn.lrn(data=inexpr, **par)


def _AverageLayerParams(op, inexpr, etab):
    if not isinstance(inexpr, list) or len(inexpr) < 2:
        raise ValueError("Expect minimum 2 inputs")
    count = len(inexpr)
    _sum = inexpr[0]
    for i in range(1, count):
        _sum = _op.add(_sum, inexpr[i])
    return _sum / _expr.const(count, dtype='float32')


def _MaxLayerParams(op, inexpr, etab):
    if not isinstance(inexpr, list) or len(inexpr) < 2:
        raise ValueError("Expect minimum 2 inputs")
    _max = inexpr[0]
    for i in range(1, len(inexpr)):
        _max = _op.maximum(_max, inexpr[i])
    return _max


def _MinLayerParams(op, inexpr, etab):
    if not isinstance(inexpr, list) or len(inexpr) < 2:
        raise ValueError("Expect minimum 2 inputs")
    _min = inexpr[0]
    for i in range(1, len(inexpr)):
        _min = _op.minimum(_min, inexpr[i])
    return _min


_convert_map = {
    'NeuralNetworkMeanImage': _NeuralNetworkMeanImage,
    'NeuralNetworkImageScaler': _NeuralNetworkImageScaler,
    'ConvolutionLayerParams': _ConvolutionLayerParams,
    'BatchnormLayerParams': _BatchnormLayerParams,
    'ActivationParams': _ActivationParams,
    'ScaleLayerParams': _ScaleLayerParams,
    'PoolingLayerParams': _PoolingLayerParams,
    'SoftmaxLayerParams': _SoftmaxLayerParams,
    'InnerProductLayerParams': _InnerProductLayerParams,
    'AddLayerParams': _AddLayerParams,
    'MultiplyLayerParams': _MultiplyLayerParams,
    'FlattenLayerParams': _FlattenLayerParams,
    'ConcatLayerParams': _ConcatLayerParams,
    'PaddingLayerParams': _PaddingLayerParams,
    'PermuteLayerParams': _PermuteLayerParams,
    'UpsampleLayerParams': _UpsampleLayerParams,
    'L2NormalizeLayerParams': _L2NormalizeLayerParams,
    'LRNLayerParams': _LRNLayerParams,
    'AverageLayerParams': _AverageLayerParams,
    'MaxLayerParams': _MaxLayerParams,
    'MinLayerParams': _MinLayerParams,
}


def coreml_op_to_relay(op, inname, outname, etab):
    """Convert coreml layer to a Relay expression and update the expression table.

    Parameters
    ----------
    op: a coreml protobuf bit

    inname : str or list of str
        Name of the input Relay expression.

    outname : str
        Name of the output Relay expression.

    etab : relay.frontend.common.ExprTable
        The global expression table to be updated.
    """
    classname = type(op).__name__
    if classname not in _convert_map:
        raise tvm.error.OpNotImplemented(
            'Operator {} is not supported in frontend CoreML.'.format(classname))
    if isinstance(inname, _base.string_types):
        insym = etab.get_expr(inname)
    else:
        insym = [etab.get_expr(i) for i in inname]
    ret = _convert_map[classname](op, insym, etab)
    if outname:
        etab.set_expr(outname, ret)
    if classname != 'PaddingLayerParams':
        assert not etab.in_padding, "Previous padding not consumed by conv/pool"


def from_coreml(model, shape=None):
    """Convert from coreml model into Relay Function.

    Parameters
    ----------
    model:
        coremltools.models.MLModel of a NeuralNetworkClassifier

    shape : dict of str to int list/tuple, optional
        The input shapes

    Returns
    -------
    func : tvm.relay.Function
        Compatible relay Function.

    params : dict of str to tvm.NDArray
        The parameter dict to be used by Relay.
    """
    try:
        import coremltools as cm
    except ImportError:
        raise ImportError('The coremltools package must be installed')

    assert isinstance(model, cm.models.MLModel)
    spec = model.get_spec()
    modeltype = spec.WhichOneof('Type')
    assert modeltype in ['neuralNetworkClassifier', 'neuralNetwork', 'neuralNetworkRegressor']
    cc = getattr(spec, modeltype)

    etab = ExprTable()
    for i in spec.description.input:
        input_shape = shape[i.name] if shape is not None and i.name in shape else None
        etab.set_expr(i.name, _expr.var(i.name, shape=input_shape))

    for pp in cc.preprocessing:
        whichpp = pp.WhichOneof('preprocessor')
        ppmethod = getattr(pp, whichpp)
        # the NeuralNetworkImageScalar doesn't seem to have a featureName?
        if whichpp == 'scaler':
            for i in spec.description.input:
                coreml_op_to_relay(ppmethod, i.name, i.name, etab)
        else:
            coreml_op_to_relay(ppmethod, pp.featureName, pp.featureName, etab)

    for l in cc.layers:
        layertype = l.WhichOneof('layer')
        layerop = getattr(l, layertype)
        assert len(l.output) == 1
        if len(l.input) == 1:
            coreml_op_to_relay(layerop, l.input[0], l.output[0], etab)
        else:
            coreml_op_to_relay(layerop, list(l.input), l.output[0], etab)

    outexpr = [etab.get_expr(o.name) if o.name in etab.exprs else _expr.var(o.name)
               for o in spec.description.output]
    # for now return first output
    outexpr = outexpr[0]
    func = _expr.Function(ir_pass.free_vars(outexpr), outexpr)
    params = {k:_nd.array(np.array(v, dtype=np.float32)) for k, v in etab.params.items()}
    return func, params
